[SPARK-56284] Adding UDF worker specification protobuf definition#55165
[SPARK-56284] Adding UDF worker specification protobuf definition#55165sven-weber-db wants to merge 1 commit intoapache:masterfrom
Conversation
046fd81 to
3b6fb12
Compare
3b6fb12 to
24ecbb2
Compare
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
This PR fills in the previously placeholder UDFWorkerSpecification protobuf message with the full worker specification schema per SPIP SPARK-55278.
Design approach: Two proto files define a layered worker specification:
common.proto— shared types for reuse by both the worker spec and the forthcoming UDF protocol:UDFWorkerDataFormat(data serialization format),UDFShape/SparkUDFShapes(UDF execution shapes).worker_spec.proto— the full specification:UDFWorkerSpecificationcomposesWorkerEnvironment(lifecycle callables),WorkerCapabilities(data formats, UDF shapes, concurrency/reuse flags), and aDirectWorker(process callable + connection + timeout properties). Transport is abstracted viaWorkerConnection(oneofof Unix domain socket or TCP).
Key design decisions:
ProcessCallableseparatescommand(executable prefix) fromarguments, with the engine injecting--idand--connectionat invocation time.oneof workerinUDFWorkerSpecificationandoneof transportinWorkerConnectionprovide extension points for future worker provisioning strategies and transport types.WorkerCapabilities.supports_concurrent_udfsis defined but explicitly deferred for future use.
General comments:
udf/worker/README.md(line 23) still says "UDFWorkerSpecification -- currently a placeholder" — should be updated now that the specification is filled in.- Spark Connect protos use
(Required)/(Optional)annotations on field comments to clarify the application-level contract. For fields likesupported_data_formats(where the comment says "Every worker MUST at least support ARROW"), such annotations would make the requirement immediately visible to proto consumers.
| // engine-configurable maximum time (e.g. 30 seconds). | ||
| optional int32 graceful_termination_timeout_ms = 2; | ||
|
|
||
| // The connection this [[UDFWorker]] supports. Note that a single |
There was a problem hiding this comment.
[[UDFWorker]] is not defined anywhere — no proto message, no Scala/Java class. The same dangling reference appears at lines 149 and 159. The closest entity is DirectWorker (line 101). Should these reference DirectWorker, or is UDFWorker a planned type not yet introduced?
There was a problem hiding this comment.
Good catch! [[UDFWorker]] was the name we previously used for DirectWorker. Before raising this PR, it was renamed, and I seem to have forgotten to update all references in the text to the old name. This should be fixed now. Thank you!
| // ["\"echo 'Test'\""] | ||
| // | ||
| // Every executable will ALWAYS receive a | ||
| // --id argument. This argument CANNOT be part of the below list of arguments. |
There was a problem hiding this comment.
The --id argument is explicitly reserved here ("CANNOT be part of the below list of arguments"), but --connection (injected by the engine per lines 130–134) has no such restriction documented. A user including --connection in their arguments would conflict with the engine-injected value. Consider adding the same reservation for --connection.
There was a problem hiding this comment.
Yes, very good point. I have updated the description to a list of restricted values including both --id and --connection. Thank you!
| } | ||
| } | ||
|
|
||
| enum SparkUDFShapes { |
There was a problem hiding this comment.
SparkUDFShapes uses plural naming, while UDFWorkerDataFormat in the same file uses singular. Proto convention recommends singular enum names — consider SparkUDFShape.
There was a problem hiding this comment.
Good catch, thank you! Fixed.
| // produces iterator to a batch of rows as output. | ||
| MAP_PARTITIONS = 2; |
There was a problem hiding this comment.
Grammar — "a iterator" and missing article:
| // produces iterator to a batch of rows as output. | |
| MAP_PARTITIONS = 2; | |
| // UDF receives an iterator to a batch of rows as input and | |
| // produces an iterator to a batch of rows as output. |
There was a problem hiding this comment.
Missed this - thank you!
|
|
||
| // Which types of UDFs this worker supports. | ||
| // This should list all supported Shapes. | ||
| // Of which shape a specific UDF is will be communicated |
There was a problem hiding this comment.
Awkward phrasing:
| // Of which shape a specific UDF is will be communicated | |
| // The shape of a specific UDF will be communicated |
| // After this time, the worker process should have terminated itself. | ||
| // Otherwise, the process will be forcefully killed using SIGKILL. | ||
| // | ||
| // The engine will use this timeout, if it does not exceed a |
There was a problem hiding this comment.
| // The engine will use this timeout, if it does not exceed a | |
| // The engine will use this timeout, if it does not exceed an |
| } | ||
| } | ||
|
|
||
| // Communication between the engine and worker |
There was a problem hiding this comment.
Leading space before // — inconsistent with all other message-level comments:
| // Communication between the engine and worker | |
| // Communication between the engine and worker |
| // is done using a UNIX domain socket. | ||
| // | ||
| // On [[UDFWorker]] creation, a path to a socket | ||
| // to listen on is passed as a argument. |
There was a problem hiding this comment.
| // to listen on is passed as a argument. | |
| // to listen on is passed as an argument. |
| // ["python3", "-m"] | ||
| // ["worker.bin"] | ||
| // ["java", "worker.java"] | ||
| // ["bin/bash", "-c"] |
There was a problem hiding this comment.
Missing leading /:
| // ["bin/bash", "-c"] | |
| // ["/bin/bash", "-c"] |
24ecbb2 to
bd1bbf2
Compare
sven-weber-db
left a comment
There was a problem hiding this comment.
Adjusted according to review comments
| } | ||
| } | ||
|
|
||
| enum SparkUDFShapes { |
There was a problem hiding this comment.
Good catch, thank you! Fixed.
| // produces iterator to a batch of rows as output. | ||
| MAP_PARTITIONS = 2; |
There was a problem hiding this comment.
Missed this - thank you!
| // engine-configurable maximum time (e.g. 30 seconds). | ||
| optional int32 graceful_termination_timeout_ms = 2; | ||
|
|
||
| // The connection this [[UDFWorker]] supports. Note that a single |
There was a problem hiding this comment.
Good catch! [[UDFWorker]] was the name we previously used for DirectWorker. Before raising this PR, it was renamed, and I seem to have forgotten to update all references in the text to the old name. This should be fixed now. Thank you!
| // ["\"echo 'Test'\""] | ||
| // | ||
| // Every executable will ALWAYS receive a | ||
| // --id argument. This argument CANNOT be part of the below list of arguments. |
There was a problem hiding this comment.
Yes, very good point. I have updated the description to a list of restricted values including both --id and --connection. Thank you!
bd1bbf2 to
70f38f9
Compare
|
|
||
| // The UDF execution type/shape. | ||
| message UDFShape { | ||
| oneof shape { |
There was a problem hiding this comment.
Why this indirection? The shape also has nothing to do with spark...
There was a problem hiding this comment.
I revisited this Part with @haiyangsun-db. Instead of using a UDFShape, we now propose a UDFProtoCommunicationPattern. This captures which communication pattern of the UDFProto (to be added) is supported by this UDF worker. For all initial versions, this will be a bidirectional stream of bytes. However, for simpler use cases, we might add new communication modes, such as Request->Response, in the future.
The UDFShape will be covered via the client-specific init message. E.g., for Python UDFs, the Python client can tell the Python worker how to invoke the UDF (batch iterator, row-wise, etc.).
|
|
||
| // UDF receives a row with 0+ columns as input | ||
| // and produces a single, scalar value as output | ||
| EXPRESSION = 1; |
There was a problem hiding this comment.
Name it SCALAR or ONE_TO_ONE?
There was a problem hiding this comment.
Deprecated according to the above comment
|
|
||
| // UDF receives an iterator to a batch of rows as input and | ||
| // produces an iterator to a batch of rows as output. | ||
| MAP_PARTITIONS = 2; |
There was a problem hiding this comment.
Deprecated according to the above comment
| // Examples: | ||
| // /tmp/channel-uuid.sock | ||
| // /some/system/path/channel-1234.sock | ||
| message UnixDomainSocket {} |
There was a problem hiding this comment.
I am guessing you are going to add a path to this proto at some point?
There was a problem hiding this comment.
No, the path is injected into the worker callable via the --connection parameter. All properties in this .proto are client-supplied, but the path should remain in control of the engine.
I added a separate message for the UDS for future extension points. For UDS itself, it is unlikely that we will ever need an extension. However, for TCP, we might need to specify additional properties in the future. To make the proto consistent, I propose using the same structure for all transport types.
| // Examples: | ||
| // 8080 | ||
| // 1234 | ||
| message TcpConnection {} |
There was a problem hiding this comment.
If this is local only, then name it as such?
Same question as before. I assume you are going to add a port here?
There was a problem hiding this comment.
Yes, good point. I renamed it LocalTcpConnection.
As for the port: This is the same as for the UDS. The port is engine-controlled and passed as a --connection flag to the UDF worker callable. This LocalTcpConnection message is there to capture additional TCP properties that the client might control in the future.
| /// Worker specification | ||
| /// | ||
| message UDFWorkerSpecification { | ||
| WorkerEnvironment environment = 1; |
There was a problem hiding this comment.
WorkerEnvironment looks like it is in the wrong place. It uses a lot of ProcessCallables which only seem to make sense for DirectWorker. Why not make it part of the direct worker to begin with?
There was a problem hiding this comment.
The environment is not only needed for the DirectWorker. If we support a daemon-based worker creation in the future, this daemon would also need to be installed and set up in some way, which can be ensured through the environment. Therefore, I think this is not solely a property of the DirectWorker, but of all possible worker-creation modes we might support in the future.
There was a problem hiding this comment.
Ok, let me ask a different different question then: Are there going to be different ways in which a customer can setup a worker environment? If there are then we should at least make this a oneOf... (which we can do later on as well).
| } | ||
|
|
||
| // Capabilities used for query planning | ||
| message WorkerCapabilities { |
There was a problem hiding this comment.
IMO it would be a good idea to make this have a somewhat self describing name.
There was a problem hiding this comment.
Do you have a concrete name in mind?
For me, Capabilities already captures the notion of this message pretty well. This message contains everything the engine needs to plan and execute a UDF using this worker. E.g., it describes what the UDF worker is capable of (which data formats are supported, whether it supports concurrency, etc.).
I did update the comment:
// Capabilities used for query planning
to
// Capabilities used for query planning
// and running the worker during query execution.
To reflect that these properties are not only for query planning but also for execution
db55a96 to
5832ff6
Compare
5832ff6 to
bd3daa3
Compare
| // - Any needed dependencies are present | ||
| // | ||
| // (Optional) | ||
| optional ProcessCallable environment_verification = 2; |
There was a problem hiding this comment.
Document that verification failure means that we need installation. Should be very specific about the error codes here, so we can also add support a fatal conditions as well (e.g. no-GPU, incompatible CPU arch, ...)?
| // can subsequently locate and launch the worker process. | ||
| // | ||
| // (Required if environment_verification is given) | ||
| optional ProcessCallable installation = 1; |
There was a problem hiding this comment.
Document that installation failure is fatal?
| // └─────────┘ | ||
| // | ||
| // All scripts are optional. | ||
| // However, if a verification script is supplied, an installation |
There was a problem hiding this comment.
Unless the verification scripts checks for a fatal condition.
|
Merging this to master. We can address further concerns in follow-ups. |
Yicong-Huang
left a comment
There was a problem hiding this comment.
Left some comments. mainly on protobuf defs and lifecycle problems. we can address in follow ups.
| // engine-configurable maximum time (e.g. 30 seconds). | ||
| // | ||
| // (Optional) | ||
| optional int32 initialization_timeout_ms = 1; |
There was a problem hiding this comment.
timeout better use uint to avoid negative value?
There was a problem hiding this comment.
or maybe protobuf.Duration
| // (Optional) | ||
| optional ProcessCallable environment_cleanup = 3; |
There was a problem hiding this comment.
when installation fails, do we also clean up partial setup environment?
| // engine implements more advanced resource management (TBD). | ||
| // | ||
| // (Optional) | ||
| optional bool supports_concurrent_udfs = 3; |
There was a problem hiding this comment.
if not used right now, we can just reserve the keyword https://protobuf.dev/programming-guides/proto3/ ?
e.g.,
message Foo {
reserved 2, 15, 9 to 11;
}
| // ▼ ▼ | ||
| // ... | ||
| // UDF worker creation | ||
| // ... |
There was a problem hiding this comment.
it is not clear how installation failures are handled. also (not related to this figure) it is not clear how worker creation failures are handled.
What changes were proposed in this pull request?
This PR introduces the protobuf definitions for the UDF worker specification described in SPIP SPARK-55278 and this design document.
Overall, two new
.protofiles are introduced:common.proto- Shared types and messages between the worker specification & the new UDF protocol (to be introduced)worker_spec.proto- UDF worker specificationWhy are the changes needed?
This is the first step toward a language-agnostic UDF protocol for Spark that enables UDF workers written in any language to communicate with the Spark engine through a well-defined specification and API boundary. The abstractions introduced here establish the core contract that concrete implementations (e.g., process-based or gRPC-based workers) will build on.
The worker specification introduced in this PR captures all the information Spark needs to:
Does this PR introduce any user-facing change?
No. All new APIs are marked @experimental, and there are no behavioral changes to existing code.
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
Yes, in an assistive manner and for reviews.